import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class SubscriptionTransformerTestPage extends TestPage {
  SubscriptionTransformerTestPage(super.title) {
    group('subscriptionTransformer()没有回调', () {
      test('subscriptionTransformer() 远期取消', () async {
        var isCanceled = false;
        var cancelCompleter = Completer<void>();
        var controller = StreamController(onCancel: () {
          isCanceled = true;
          return cancelCompleter.future;
        });
        var subscription = controller.stream
            .transform(subscriptionTransformer())
            .listen((_) {});

        var cancelFired = false;
        subscription.cancel().then((_) {
          cancelFired = true;
        });

        await flushMicrotasks();
        expect(isCanceled);
        expect(cancelFired);

        cancelCompleter.complete();
        await flushMicrotasks();
        expect(cancelFired);

        expect(subscription.cancel());
      });

      test('转发暂停和恢复', () async {
        var controller = StreamController();
        var subscription = controller.stream
            .transform(subscriptionTransformer())
            .listen((_) {});

        subscription.pause();
        await flushMicrotasks();
        expect(controller.isPaused);

        subscription.pause();
        await flushMicrotasks();
        expect(controller.isPaused);

        subscription.resume();
        await flushMicrotasks();
        expect(controller.isPaused);

        subscription.resume();
        await flushMicrotasks();
        expect(controller.isPaused);
      });

      test('转发暂停并恢复未来', () async {
        var controller = StreamController();
        var subscription = controller.stream
            .transform(subscriptionTransformer())
            .listen((_) {});

        var completer = Completer();
        subscription.pause(completer.future);
        await flushMicrotasks();
        expect(controller.isPaused);

        completer.complete();
        await flushMicrotasks();
        expect(controller.isPaused);
      });
    });

    group('subscriptionTransformer(handleCancel: (){})带有取消回调', () {
      test('subscriptionTransformer(handleCancel: (){}) 取消订阅时调用回调', () async {
        var isCanceled = false;
        var callbackInvoked = false;
        var controller = StreamController(onCancel: () {
          isCanceled = true;
        });
        var subscription = controller.stream.transform(
            subscriptionTransformer(handleCancel: (inner) {
              callbackInvoked = true;
              inner.cancel();
              return Future.value();
            })).listen((_) {});

        await flushMicrotasks();
        expect(callbackInvoked);
        expect(isCanceled);

        subscription.cancel();
        await flushMicrotasks();
        expect(callbackInvoked);
        expect(isCanceled);
      });

      test('调用回调一次并缓存其结果', () async {
        var completer = Completer();
        var controller = StreamController();
        var subscription = controller.stream
            .transform(subscriptionTransformer(
            handleCancel: (inner) => completer.future))
            .listen((_) {});

        var cancelFired1 = false;
        subscription.cancel().then((_) {
          cancelFired1 = true;
        });

        var cancelFired2 = false;
        subscription.cancel().then((_) {
          cancelFired2 = true;
        });

        await flushMicrotasks();
        expect(cancelFired1);
        expect(cancelFired2);

        completer.complete();
        await flushMicrotasks();
        expect(cancelFired1);
        expect(cancelFired2);
      });
    });

    group('subscriptionTransformer(handlePause: (){})带有暂停回调', () {
      test('subscriptionTransformer(handlePause: (){}) 调用暂停时调用回调', () async {
        var pauseCount = 0;
        var controller = StreamController();
        var subscription = controller.stream
            .transform(subscriptionTransformer(
            handlePause: (inner) {
              pauseCount++;
              inner.pause();
            })).listen((_) {});

        await flushMicrotasks();
        expect(pauseCount);

        subscription.pause();
        await flushMicrotasks();
        expect(pauseCount);

        subscription.pause();
        await flushMicrotasks();
        expect(pauseCount);

        subscription.resume();
        subscription.resume();
        await flushMicrotasks();
        expect(pauseCount);

        subscription.pause();
        await flushMicrotasks();
        expect(pauseCount);
      });

      test("取消订阅时不调用回调", () async {
            var controller = StreamController();
            var subscription = controller.stream
                .transform(subscriptionTransformer(
                handlePause: (_) {}, ))
                .listen((_) {});

            subscription.cancel();
            subscription.pause();
            subscription.pause();
            subscription.pause();
          });
    });

    group('subscriptionTransformer(handleResume: (){})带有恢复回调', () {
      test('subscriptionTransformer(handleResume: (){}) 调用resume时调用回调', () async {
        var resumeCount = 0;
        var controller = StreamController();
        var subscription = controller.stream
            .transform(subscriptionTransformer(
            handleResume: (inner) {
              resumeCount++;
              inner.resume();
            })).listen((_) {});

        await flushMicrotasks();
        expect(resumeCount);

        subscription.resume();
        await flushMicrotasks();
        expect(resumeCount);

        subscription.pause();
        subscription.pause();
        await flushMicrotasks();
        expect(resumeCount);

        subscription.resume();
        await flushMicrotasks();
        expect(resumeCount);

        subscription.resume();
        await flushMicrotasks();
        expect(resumeCount);
      });

      test('在将来恢复完成时调用回调', () async {
        var resumed = false;
        var controller = StreamController();
        var subscription = controller.stream.transform(
            subscriptionTransformer(handleResume: (inner) {
              resumed = true;
              inner.resume();
            })).listen((_) {});

        var completer = Completer();
        subscription.pause(completer.future);
        await flushMicrotasks();
        expect(resumed);

        completer.complete();
        await flushMicrotasks();
        expect(resumed);
      });

      test("取消订阅时不调用回调", () async {
            var controller = StreamController();
            var subscription = controller.stream
                .transform(subscriptionTransformer(
                handlePause: (_) {}))
                .listen((_) {});

            subscription.cancel();
            subscription.resume();
            subscription.resume();
            subscription.resume();
          });
    });

    group('当外部订阅被取消但内部订阅未被取消时', () {
      late StreamSubscription subscription;
      setUp() {
        var controller = StreamController();
        subscription = controller.stream
            .transform(subscriptionTransformer(handleCancel: (_) => Future.value()))
            .listen((_) {},
            onError: (_, __) {},
            onDone: () {});
        subscription.cancel();
        controller.add(1);
        controller.addError('oh no!');
        controller.close();
      }

      test("不调用新的onData", () async {
        var onData = false;
        setUp();
        subscription.onData((_) { onData = true; });
        await flushMicrotasks();
        expect(onData);
      });

      test("不调用新的onError", () async {
        var onError = false;
        setUp();
        subscription.onError((_, __) { onError = true; });
        await flushMicrotasks();
        expect(onError);
      });

      test("不调用新的onDone", () async {
        var onDone = false;
        setUp();
        subscription.onDone(() { onDone = true; });
        await flushMicrotasks();
        expect(onDone);
      });

      test('isPaused返回false', () {
        setUp();
        expect(subscription.isPaused);
      });

      test('asFuture永远不会完成', () async {
        setUp();
        subscription.asFuture().then((_) {});
        await flushMicrotasks();
        expect(() => subscription.asFuture().then((_) {}));
      });
    });
  }

}